package com.lemon.core.conf;

import com.lemon.core.conf.core.ZooKeeperRetry;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;

/**
 * @version 1.0
 */
@SuppressWarnings({"unchecked"})
public class ZKClient {
    private static String zkHostPattern = "^((25[0-5]|2[0-4]\\d|1\\d{2}|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d{2}|[1-9]?\\d)){3}(:\\d{1,5}){0,1},)*((25[0-5]|2[0-4]\\d|1\\d{2}|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d{2}|[1-9]?\\d)){3}(:\\d{1,5}){0,1})$";
    private final static String debugHost = "localhost:2181";
    private final static String devHost = "";
    private final static String testHost = "";
    // 2017-04-19 更改生产Zookeeper地址
    private final static String prodHost = "";


    private volatile ZooKeeper zooKeeper;
    static private ZKClient CLIENT;

    private ExecutorService watchExecutorService = Executors.newCachedThreadPool();
    private ConcurrentMap<Object, WatchBag> watches = new ConcurrentHashMap();
    private Map<String, String> ephemeralNodes = new ConcurrentHashMap();
    private List<RetryRun> tries = Collections
            .synchronizedList(new LinkedList());
    private List<RetryRun> watchTries = Collections
            .synchronizedList(new LinkedList());

    private String[] connectAddress = null;

    final Watcher defaultWatch = new Watcher() {
        public void process(WatchedEvent e) {
//            log.info(e.getPath(), "", "");
            if (e.getState() == Watcher.Event.KeeperState.Expired) {

                handleExpired();
            } else if (e.getState() == Watcher.Event.KeeperState.Disconnected) {
//                log.warn("ZKDisconnected! {}", e.toString(), "");
            } else if (e.getState() == Watcher.Event.KeeperState.SyncConnected) {
//                log.warn("ZKSyncConnected! {}", e.toString(), "");
                ZKClient.this.handleSynConnected();
            }

        }
    };

    private static interface RetryRun {
        public void run() throws KeeperException, InterruptedException;
    }

    private static class WatchBag {
        WatchBag(Object watch, String path, String payload) {
            this.path = path;
            this.payload = payload;
            this.watch = watch;
        }

        private String path;
        private String payload;
        volatile Object watch;

        private long lastid = -999;

    }

    public static abstract class ChildrenWatcher {

        final private Set<String> children = new HashSet<String>();

        /**
         * can do long task in this callback
         *
         * @param node
         */
        public abstract void nodeAdded(String node);

        /**
         * can do long task in this callback
         *
         * @param node
         */
        public abstract void nodeRemoved(String node);

        public boolean triggerInNewThread() {
            return true;
        }

        final synchronized public List<String> getChildren() {
            return new ArrayList<String>(this.children);
        }

        synchronized final private void childrenCome(List<String> c) {
            for (String s : c) {

                if (this.children.contains(s)) {
                    continue;
                } else {
                    this.children.add(s);
                    final String fs = s;
                    if (this.triggerInNewThread()) {
                        ZKClient.get().watchExecutorService
                                .submit(new Runnable() {

                                    public void run() {
                                        try {
                                            nodeAdded(fs);
                                        } catch (Throwable e) {
//                                            log.error("Error while submit node added:" + fs, e);
                                        }
                                    }
                                });
                    } else {
                        try {
                            nodeAdded(fs);
                        } catch (Throwable e) {
//                            log.error("Error while node added:" + fs, e);
                        }
                    }

                }
            }
            List<String> toRemove = new ArrayList<String>();
            for (String s : this.children) {
                if (c.contains(s)) {
                    continue;
                } else {
                    toRemove.add(s);
                }
            }
            for (String s : toRemove) {
                this.children.remove(s);
                final String fs = s;
                if (this.triggerInNewThread()) {
                    ZKClient.get().watchExecutorService.submit(new Runnable() {

                        public void run() {
                            try {
                                nodeRemoved(fs);

                            } catch (Exception e) {
//                                log.error("Error while node removed:{}" + fs, e);

                            }
                        }
                    });
                } else {
                    try {
                        nodeRemoved(fs);

                    } catch (Exception e) {
//                        log.error("Error while node removed:{}" + fs, e);

                    }
                }

            }

        }

    }

    public static interface LongValueWatcher {
        void valueChanged(long l);

    }

    public static interface StringValueWatcher {
        void valueChanged(String l);
    }

    public static interface MasterWatcher {
        /**
         * Master变更为该server，通过比较与本server是否相同判断自己是否master,
         * 如果调用时给出的servername为null，这里的参数就可以为null
         *
         * @param serverName
         */
        void masterChangeTo(String serverName);

        void exceptionCaught(Throwable t);
    }

    private static class InterMasterWatcher {
        final WatchBag watch;
        private String old;
        private String path;

        public InterMasterWatcher(WatchBag watch) {
            this.watch = watch;

        }

        private void masterChangeTo(final String serverName) {

            MasterWatcher mw = (MasterWatcher) watch.watch;
            if (mw != null) {
                mw.masterChangeTo(serverName);
            }

        }

        private void setMaster(String name) {

            if (old != null && old.equals(name)) {
//                log.debug("It's same,Not set Master:{},path:{}", name, path);
            } else {
                old = name;
//                log.debug("Setting Master:{},path:{}", name, path);
                this.masterChangeTo(name);

            }

        }

    }

    private ZKClient() {
        try {
            this.init();
        } catch (IOException ex) {
//            log.error("Error connect ZK", ex);
        }
    }

    private ZKClient(String quorum) {
        try {
            this.init(quorum);
        } catch (IOException e) {
//            log.error("Error connect ZK", e);
        }
    }

    /**
     * <p>
     * 本类为单例，一旦调用就没法改变，如果你需要指定自己的zk配置路径，在使用本类之前先调用BaseConfig.setConfigFrom()
     * </p>
     *
     * @return
     */
    public static ZKClient get() {
        if (CLIENT == null) {
            synchronized (ZKClient.class) {
                CLIENT = new ZKClient();
            }
        }
        return CLIENT;
    }

    public static ZKClient get(String quorum) {
        if (CLIENT == null) {
            synchronized (ZKClient.class) {
                CLIENT = new ZKClient(quorum);
            }
        }
        return CLIENT;
    }

    private void init(String quorum) throws IOException {
        zooKeeper = new ZooKeeperRetry(quorum,
                new Integer(BaseConfig.getValue("zk.session.timeout", "30000")),
                defaultWatch);
        this.connectAddress = quorum.split(";");
    }

    private void init() throws IOException {
        // zooKeeper = new ZooKeeper(SystemConfig.getProperty("zk.quorum"),
        // Integer.parseInt(SystemConfig.getProperty("zk.session.timeout",
        // "30000")), defaultWatch);
        String zkQuorum = BaseConfig.getValue("zk.quorum", "");

        List<String> zkHostList = new ArrayList<String>();

        if (Pattern.matches(zkHostPattern, zkQuorum)) {
//            log.debug("use zk.quorum hosts.....:" + zkQuorum);
            connectAddress = zkQuorum.split(",");
        } else {
            if (zkQuorum.toLowerCase().equals("dev")) {
//                log.debug("use dev hosts.....");
                connectAddress = devHost.split(",");
            } else if (zkQuorum.toLowerCase().equals("prod")) {
//                log.debug("use prod hosts.....");
                connectAddress = prodHost.split(",");
            } else if (zkQuorum.toLowerCase().equals("debug")) {
//                log.debug("use debug hosts.....");
                connectAddress = debugHost.split(",");
            } else {
//                log.debug("use test hosts.....");
                connectAddress = testHost.split(",");
            }

        }
        Collections.addAll(zkHostList, connectAddress);
        Collections.shuffle(zkHostList);
        // zooKeeper = new ZooKeeper(StringUtils.join(zkHostList, ","), new
        // Integer(BaseConfig.getValue("zk.session.timeout", "30000")),
        // defaultWatch);
        zooKeeper = new ZooKeeperRetry(
                StringUtils.join(zkHostList, ","),
                new Integer(BaseConfig.getValue("zk.session.timeout", "30000")),
                defaultWatch);
    }

    private void handleSynConnected() {
        for (RetryRun r : this.tries) {
            try {
                r.run();
            } catch (Exception e) {
//                log.error("Error while handleSynConnected", e);
            }
        }
        this.tries.clear();
        for (RetryRun r : this.watchTries) {
            try {
                r.run();
            } catch (Exception e) {
//                log.error("Error while handleSynConnected", e);
            }
        }
        this.watchTries.clear();
    }

    /**
     * 当到ZK的连接超时触发 TODO:
     */
    private synchronized void handleExpired() {

        if (this.zooKeeper.getState() == ZooKeeper.States.CONNECTED) {
//            log.warn("Already handling ZK Expired", "", "");
            return;
        }
        try {
            this.zooKeeper.close();

        } catch (Exception e) {
//            log.error("Error while close old zk", e);
        }
//        log.info("Handling ZK Expired", "", "");

        try {
            this.init();
            this.watchTries.clear();

        } catch (Exception e) {
//            log.error("Error while init ZK ,in handle expored", e);
        }

        for (Map.Entry<String, String> e : this.ephemeralNodes.entrySet()) {
            try {

                this.registerEphemeralNode(e.getKey(), e.getValue());
            } catch (KeeperException.ConnectionLossException ex) {
                final String ek = e.getKey();
                final String ev = e.getValue();
                this.tries.add(new RetryRun() {
                    public void run() throws KeeperException,
                            InterruptedException {
                        registerEphemeralNode(ek, ev);
                    }
                });
            } catch (Exception f) {
//                log.warn("Error while reregisterEphemeralNode,handleExpired",
//                        "", "");
            }
        }
        try {

            for (WatchBag bag : this.watches.values()) {
                if (bag.watch == null) {
                    continue;
                }
                this.addWatch(bag);
            }
        } catch (Exception e) {
//            log.error("Error while init ZK ,in handle expored", e);
        }

    }

    public String dump(String path, int tabCount) throws InterruptedException,
            KeeperException {
        StringBuilder sb = new StringBuilder();
        List<String> cl = this.getChildren(path);

        for (String c : cl) {
            String cp = path + (path.endsWith("/") ? "" : "/") + c;
            for (int i = 0; i < tabCount; i++) {
                sb.append("\t");
            }
            // modified by junsen_ye 20100928 节点数据为null时，此处抛异常
            byte[] data = this.getData(cp);
            String info = "";
            if (null != data && data.length > 0) {
                info = new String(data);
            }
            // end modified
            sb.append(c).append("{").append(info).append("}\n");
            sb.append(this.dump(cp, tabCount + 1));

        }
        return sb.toString();
    }

    public void unRegisterEphemeralNode(final String path, String hostname)
            throws KeeperException, InterruptedException {
        this.ephemeralNodes.remove(path);
        final String fullpath = path + "/" + hostname;
        try {
            this.forceDelete(fullpath);
        } catch (KeeperException.ConnectionLossException e) {
//            log.warn("ConnectionLossException:{},{}", path, hostname);
            this.tries.add(new RetryRun() {
                public void run() throws KeeperException, InterruptedException {
                    if (!ephemeralNodes.containsKey(path)) {
                        forceDelete(fullpath);
                    }
                }
            });

        }
    }

    public void registerEphemeralNode(String path, String nodename)
            throws KeeperException, InterruptedException {
        // this.createIfNotExist(path);
        if (nodename == null) {
            throw new NullPointerException("hostname is null,path:"
                    + path);
        }

        String sb = new StringBuilder(path).append("/").append(nodename)
                .toString();
        try {
            this.zooKeeper.create(sb, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);
        } catch (KeeperException.NodeExistsException ke) {
//            log.warn("Node {} exisit,delete and create again", sb, "");

            this.forceDelete(sb);

            this.zooKeeper.create(sb, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);

        }

        ephemeralNodes.put(path, nodename);

    }

    /**
     * whill stackoverflow? need check
     *
     * @param path
     * @param cw
     */
    public void watchChildren(final String path, ChildrenWatcher cw) {
        WatchBag bag = new WatchBag(cw, path, null);
        this.watches.put(cw, bag);
        this.__watchChildren(bag);
    }

    private void __watchChildren(final WatchBag bag) {
        ChildrenWatcher cw = (ChildrenWatcher) bag.watch;
        if (cw == null) {
            return;
        }
        final Watcher w = new Watcher() {

            public void process(WatchedEvent we) {

                if (we.getState() == Event.KeeperState.Expired) {

                    handleExpired();
                } else if (we.getType() == Event.EventType.NodeChildrenChanged) {

                    __watchChildren(bag);

                } else if (we.getType() == Event.EventType.NodeCreated) {
                    __watchChildren(bag);
                } else if (we.getType() == Event.EventType.NodeDeleted) {
                    __watchChildren(bag);
                } else if (we.getType() == Event.EventType.NodeDataChanged) {
//                    log.warn("watch childern NodeDataChanged:{}", bag.path, "");

                }

            }
        };

        try {
            Stat stat = new Stat();
            List<String> result = zooKeeper.getChildren(
                    bag.path, w, stat);
            if (bag.lastid == stat.getCversion()) {
//                log.warn("children watch get a dupmxxid:{}", bag.lastid, "");
                return;
            }
            bag.lastid = stat.getCversion();
            cw.childrenCome(result);
        } catch (KeeperException.ConnectionLossException e) {
//            log.debug("watch children  ConnectionLossException{}", bag.path);
            this.watchTries.add(new RetryRun() {
                public void run() throws KeeperException, InterruptedException {
                    if (bag.watch != null)
                        zooKeeper.getChildren(bag.path, w, null);
                }
            });
        } catch (KeeperException e) {
//            log.error("watch children KeeperException:{}" + bag.path, e);
        } catch (Exception e) {
//            log.error("watch children Exception:{}" + bag.path, e);
        }
    }

    public void watchStrValueNode(String path, StringValueWatcher watch) {
        WatchBag bag = new WatchBag(watch, path, null);
        this.watches.put(watch, bag);
        this.__watchStrValueNode(bag);
    }

    private String __watchStrValueNode(final WatchBag bag) {
        StringValueWatcher lw = (StringValueWatcher) bag.watch;
        if (lw == null) {
            return null;
        }

        final Watcher w = new Watcher() {
            public void process(org.apache.zookeeper.WatchedEvent we) {
                if (we.getState() == Event.KeeperState.Expired) {
                    handleExpired();
                } else if (we.getType() == Event.EventType.NodeDeleted) {

                } else if (we.getType() == Event.EventType.NodeCreated) {
                    if (bag.watch != null)
                        __watchStrValueNode(bag);
                } else if (we.getType() == Event.EventType.NodeDataChanged) {
                    final StringValueWatcher lw = (StringValueWatcher) bag.watch;
                    if (lw != null) {
                        final String s = __watchStrValueNode(bag);
                        if (s != null) {
                            new Thread(new Runnable() {
                                public void run() {
                                    lw.valueChanged(s);
                                }
                            }, "StringWatcher-trigger-thread").start();
                        }
                    }
                }
            }
        };

        try {
            Stat stat = new Stat();
            String ss = new String(zooKeeper.getData(bag.path, w, stat));
            if (bag.lastid == stat.getMzxid()) {
//                log.warn("str watch get a dupmxxid:{}", bag.lastid, "");
                return null;
            }
            bag.lastid = stat.getMzxid();
            return ss;

        } catch (KeeperException.ConnectionLossException e) {
//            log.debug("Str watch ConnectionLossException:{}", bag.path);
            this.watchTries.add(new RetryRun() {
                public void run() throws KeeperException, InterruptedException {
                    if (bag.watch != null)
                        zooKeeper.getData(bag.path, w, null);
                }
            });

        } catch (KeeperException e) {
//            log.error(" watchStrValueNode KeeperException:{}" + bag.path, e);
        } catch (Exception e) {
//            log.error("watchStrValueNode Exception:{}" + bag.path, e);
        }
        return null;
    }

    /**
     * @param path
     * @param watch
     * @return
     */
    public long watchLongValueNode(final String path,
                                   final LongValueWatcher watch) {
        WatchBag bag = new WatchBag(watch, path, null);
        this.watches.put(watch, bag);
        return this.__watchLongValueNode(bag);
    }

    private long __watchLongValueNode(final WatchBag bag)

    {
        LongValueWatcher lw = (LongValueWatcher) bag.watch;
        if (lw == null) {
            return Long.MAX_VALUE;
        }

        final Watcher w = new Watcher() {
            public void process(org.apache.zookeeper.WatchedEvent we) {
                // System.out.println(we);
                if (we.getState() == Event.KeeperState.Expired) {
                    handleExpired();
                } else if (we.getType() == Event.EventType.NodeDeleted) {
//                    log.warn("NodeDeleted:{}", bag.path, "");
                } else if (we.getType() == Event.EventType.NodeCreated) {

                    if (bag.watch != null) {
                        __watchLongValueNode(bag);
                    }

                } else if (we.getType() == Event.EventType.NodeDataChanged) {
                    final LongValueWatcher lw = (LongValueWatcher) bag.watch;
                    if (lw != null) {
                        final long lv = __watchLongValueNode(bag);
                        if (lv != Long.MAX_VALUE) {
                            new Thread(new Runnable() {
                                public void run() {
                                    lw.valueChanged(lv);
                                }
                            }, "LongWatcher-trigger-thread").start();
                        }
                    }
                }
            }
        };

        try {
            Stat stat = new Stat();
            String s = new String(this.zooKeeper.getData(bag.path, w, stat));
            if (bag.lastid == stat.getMzxid()) {
//                log.warn("long watch get a dupmxxid:{}", bag.lastid, "");
                return Long.MAX_VALUE;
            }
            bag.lastid = stat.getMzxid();
            long lv = Long.parseLong(s);

            return lv;

        } catch (KeeperException.ConnectionLossException e) {
//            log.debug("ConnectionLossException{}", bag.path);
            this.watchTries.add(new RetryRun() {
                public void run() throws KeeperException, InterruptedException {
                    if (bag.watch != null)
                        zooKeeper.getData(bag.path, w, null);
                }
            });

        } catch (KeeperException e) {
//            log.error("KeeperException:{}" + bag.path, e);
        } catch (InterruptedException e) {
//            log.error("watchTimeNode InterruptedException:{}" + bag.path, e);
        } catch (NumberFormatException e) {
//            log.error("watchTimeNode NumberFormatException:{}" + bag.path, e);
        } catch (Exception e) {
//            log.error("watchTimeNode Exception:{}" + bag.path, e);
        }
        return Long.MAX_VALUE;

    }

    public void addWatch(WatchBag bag) {
        Object watch = bag.watch;
        if (watch instanceof ZKClient.MasterWatcher) {
            this.watchMaster(bag.path, bag.payload, (MasterWatcher) watch);
        } else if (watch instanceof ZKClient.LongValueWatcher) {
            this.watchLongValueNode(bag.path, (LongValueWatcher) watch);
        } else if (watch instanceof ZKClient.ChildrenWatcher) {
            watchChildren(bag.path, (ChildrenWatcher) watch);
        } else if (watch instanceof ZKClient.StringValueWatcher) {
            this.watchStrValueNode(bag.path, (StringValueWatcher) watch);
        } else {
            throw new RuntimeException("Unknown Watch Type," + watch.getClass()
                    + ":" + watch.toString() + " path:" + bag.path);
        }

    }

    public void destoryWatch(Object watch) {
        if (watch == null) {
            return;
        }
        WatchBag ar = this.watches.remove(watch);
        if (ar != null) {
            ar.watch = null;// (null);

        }
        if (watch instanceof ChildrenWatcher
                || watch instanceof LongValueWatcher
                || watch instanceof MasterWatcher
                || watch instanceof StringValueWatcher

                ) {
        } else {
            throw new RuntimeException(watch.getClass() + "Not a valid watch");
        }

    }

    public void unwatchMaster(final String path, final String serverAddress,
                              final MasterWatcher watch) {
        this.destoryWatch(watch);
        if (serverAddress != null) {
            this.deleteMaster(path, serverAddress);
        }
    }

    private void deleteMaster(final String path, final String serverAddress) {
        if (serverAddress != null) {
            try {
                byte[] master = this.zooKeeper.getData(path, false, null);
                String m = new String(master);
                if (m.equals(serverAddress)) {
                    zooKeeper.delete(path, -1);
                }

            } catch (KeeperException.NoNodeException e) {
                // it's ok
            } catch (KeeperException.ConnectionLossException e) {
                // log.warn("ConnectionLossException while deleteMaster: path{},address:{}",e,
                // path,serverAddress);
//                log.warn(
//                        "ConnectionLossException while deleteMaster: path{},address:{}",
//                        path, serverAddress);
                this.tries.add(new RetryRun() {
                    public void run() {
//                        log.debug("RetryRundeleteMaster: path{},address:{}",
//                                path, serverAddress);

                        deleteMaster(path, serverAddress);
                    }
                });
            } catch (Exception e) {
//                log.error("Error while deleteMaster (line 1014): path{},address:{}",
//                        path + serverAddress,
//                        e
//                );
            }
        }
    }

    public void watchMaster(String path, String serverAddress,
                            MasterWatcher watch) {

        this.deleteMaster(path, serverAddress);

        WatchBag bag = new WatchBag(watch, path, serverAddress);
        this.watches.put(watch, bag);

        __watchMaster(bag);

    }

    /**
     * @param ：服务器名，如果null，只watch，不set
     * @param bag
     */
    final private void __watchMaster(final WatchBag bag)
    // throws KeeperException, InterruptedException
    {
        MasterWatcher watch = (MasterWatcher) bag.watch;
        if (watch == null) {
            return;
        }

        final InterMasterWatcher iw = new InterMasterWatcher(bag);
        iw.path = bag.path;
        final Watcher w = new Watcher() {
            public void process(org.apache.zookeeper.WatchedEvent we) {
                if (we.getState() == Event.KeeperState.Expired) {
                    handleExpired();
                } else if (we.getType() == Event.EventType.NodeDeleted) {

                    if (bag.payload == null) {
                        iw.setMaster(null);
                    }
                    watchExecutorService.submit(new Runnable() {
                        public void run() {
                            try {
                                Thread.sleep(300);
                            } catch (InterruptedException e) {
                            }
                            __watchMaster(bag);
                        }
                    });
                    // 否则不能删除Cluster

                } else if (we.getType() == Event.EventType.NodeCreated) {
                    __watchMaster(bag);
                } else if (we.getType() == Event.EventType.NodeDataChanged) {
                    __watchMaster(bag);

//                    log.error("Node data should not change");
                }
            }
        };

        try {

            try {
                Stat stat = new Stat();
                byte[] master = this.zooKeeper.getData(bag.path, false, stat);
                if (stat.getMzxid() == bag.lastid) {
//                    log.warn("master watch Get a dupmxxid:{}", bag.lastid, "");
                }
                bag.lastid = stat.getMzxid();

                iw.setMaster(new String(master));
            } catch (KeeperException.NoNodeException e) {
                String serverAddress = bag.payload;
                if (serverAddress != null) {
                    try {
                        this.zooKeeper.create(bag.path,
                                serverAddress.getBytes(),
                                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.EPHEMERAL);
                        iw.setMaster(serverAddress);

                    } catch (KeeperException.NodeExistsException ee) {
                        // 刚才没有，现在有了，zk会触发的，所以忽略
                    } catch (KeeperException.NoNodeException ee) {
                        // 没有父节点，在删除schema
                        watch.exceptionCaught(ee);
                    }

                } else {
                    iw.setMaster(null);
                }
            }

        } catch (Exception e) {
//            log.error("Error while get Master", e);
            watch.exceptionCaught(e);

        }
        try {
            this.zooKeeper.exists(bag.path, w);
        } catch (KeeperException.ConnectionLossException e) {
            this.watchTries.add(new RetryRun() {
                public void run() throws KeeperException, InterruptedException {
                    zooKeeper.exists(bag.path, w);
                }
            });
        } catch (Exception ex) {
//            log.warn("ZK Error while watch Master", "", "");
        }

    }

    public void forceDelete(String path) throws KeeperException,
            InterruptedException {
        try {
            List<String> ss = this.getChildren(path);

            for (String s : ss) {
                this.forceDelete(path + "/" + s);
            }

            this.zooKeeper.delete(path, -1);
        } catch (KeeperException.NoNodeException e) {
//            log.error("forceDelete erro.", e);
        }

    }

    public boolean existed(String path) throws KeeperException, InterruptedException {
        return zooKeeper.exists(path, false) != null;
    }


    public List<String> getChildren(String path) throws InterruptedException,
            KeeperException {
        return zooKeeper.getChildren(path, false);

    }

    public byte[] getData(String path) throws KeeperException,
            InterruptedException {
        return zooKeeper.getData(path, false, null);
    }

    public void createIfNotExist(String p) throws KeeperException,
            InterruptedException {
        createIfNotExist(p, CreateMode.PERSISTENT);
    }

    public void createIfNotExist(String path, CreateMode mode) throws KeeperException, InterruptedException {
        String[] paths = path.split("/");
        String p = "";

        for (String dir : paths) {
            if (dir.equals("")) {
                continue;
            }
            p = p + "/" + dir;

            if (null == zooKeeper.exists(p, false)) {
                zooKeeper.create(path, new byte[]{},
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
            }
        }
    }

    public String create(String path, byte[] data) throws KeeperException,
            InterruptedException {
        return create(path, data, CreateMode.PERSISTENT);
    }

    public String create(String path, byte[] data, CreateMode mode)
            throws KeeperException, InterruptedException {
        return zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
    }

    public String getStringData(String path) throws KeeperException,
            InterruptedException {

        try {
            return new String(zooKeeper.getData(path, false, null));
        } catch (KeeperException.NoNodeException ke) {

            return null;
        }
    }

    public Map<String, String> loadMap(String path)
            throws InterruptedException, KeeperException {
        List<String> ss = getChildren(path);

        Map<String, String> v = new TreeMap();
        for (String s : ss) {
            v.put(s, getStringData(path + "/" + s));
        }
        return v;

    }

    /**
     * DO NOT cache ZooKeeper Object!!!
     *
     * @return
     */
    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public static void main(String[] ss) throws IOException,
            InterruptedException, KeeperException {
        ZKClient.get().forceDelete("/search0");
        ZKClient.get().forceDelete("/SearchMan/search0");
        //
        // final ZKClient client = ZKClient.get();
        // Thread t=new Thread(new Runnable()
        // {
        //
        // public void run()
        // {
        // try
        // {
        // Thread.currentThread().sleep(Integer.MAX_VALUE);
        // }
        // catch (InterruptedException e)
        // {
        // }
        // }
        // });
        //
        // t.start();
        // // String s= client.dump("/", 1);
        // // System.out.println(s);
        // client.watchChildren("/SearchMan/search3/MemNodes", new
        // ChildrenWatcher()
        // {
        //
        // public void nodeAdded(String node)
        // {
        // System.out.println("nodeAdded:" + node + "--" +
        // Thread.currentThread().toString());
        // }
        //
        // public void nodeRemoved(String node)
        // {
        // System.out.println("nodeRemoved:" + node + "--" +
        // Thread.currentThread().toString());
        // }
        // });
        //
        // client.watchLongValueNode("/SearchMan/search3", new
        // LongValueWatcher(){
        // public void valueChaned(long l)
        // {
        // System.out.println("valueChaned:"+l);
        // }
        // });
        //
        // client.watchStrValueNode("/SearchMan/search3", new
        // ZKClient.StringValueWatcher(){
        //
        // public void valueChaned(String l)
        // {
        // System.out.println("valueChaneds:"+l);
        // }
        // });
        // client.watchMaster("/SearchMan/search3", "11", new MasterWatcher()
        // {
        // public void masterChangeTo(String serverName)
        // {
        // System.out.println("master:"+serverName);
        // }
        //
        // public void exceptionCaught(Throwable t)
        // {
        // t.printStackTrace();
        // }
        // });
        //
        // new Thread()
        // {
        // public void run()
        // {
        // try
        // {
        // Thread.currentThread().sleep(1000);
        // }
        // catch (InterruptedException e)
        // {
        // }
        // for (int i = 0; i < 10000; i++)
        // try
        // {
        // // System.out.println("cheack create "+i);
        // // CLIENT.createIfNotExist("/SearchMan/search3/MemNodes/"+i );
        // }
        // catch (Exception e)
        // {
        // e.printStackTrace();
        // }
        // }
        // }.start();
        //
        // new Thread()
        // {
        // public void run()
        // {
        // for (int i = 0; i < 10000; i++)
        // try
        // {
        // System.out.println("cheack create " + i);
        // CLIENT.forceDelete("/SearchMan/search3/MemNodes/" + i);
        // }
        // catch (Exception e)
        // {
        // e.printStackTrace();
        // }
        // }
        // }.start();

        // String [] paths= "/sss/sfsfsd/dfsd".split("/" );
        // for(String s:paths)
        // {
        // System.out.println(s);
        // }
        // MasterWatcher w = new MasterWatcher()
        // {
        //
        // public void masterChangeTo(String serverName)
        // {
        // //System.out.println("Master is:" + serverName);
        // }
        //
        // public void exceptionCaught(Throwable t)
        // {
        // }
        // };
        //
        // ZKClient.get().watchMaster("/test/tt", "localhost:8080", w);

    }

    public Stat setData(final String path, final byte data[])
            throws KeeperException, InterruptedException {
        return zooKeeper.setData(path, data, -1);
    }

    public void getDataFromZooKeeper(String localPath, String ZkPath)
            throws IOException, KeeperException, InterruptedException {
        if (zooKeeper.exists(ZkPath.replaceAll("/+", "/"), false) == null)
            throw new RuntimeException("The specific path does not exist!");
        List<String> childrenList = getChildren(ZkPath.replaceAll("/+", "/"));
        if (childrenList.size() == 0)
            throw new RuntimeException("Node not found!");
        for (String child : childrenList) {
            List<String> tmpList = getChildren((ZkPath + "/" + child)
                    .replaceAll("/+", "/"));
            if (!tmpList.isEmpty()) {
                File folder = new File(localPath + "/" + child);
                if (!folder.isDirectory())
                    folder.mkdirs();
                getDataFromZooKeeper(localPath + "/" + child,
                        (ZkPath + "/" + child).replaceAll("/+", "/"));
            } else {
                BufferedOutputStream writer = new BufferedOutputStream(
                        new FileOutputStream(localPath + "/" + child));
                byte[] d = getData((ZkPath + "/" + child).replaceAll("/+", "/"));
                if (d != null)
                    writer.write(d);
                writer.close();
            }
        }
    }

    public void uploadDataFromDir(String localPath, String ZkPath)
            throws KeeperException, InterruptedException {
        File dir = new File(localPath);
        if (!dir.exists())
            throw new RuntimeException("The specific path does not exist!");
        createIfNotExist(ZkPath.replaceAll("/+", "/"));
        if (dir.isFile()) {
            try {
                String newPath = new String(localPath);
                BufferedInputStream reader = new BufferedInputStream(
                        new FileInputStream(newPath));
                byte[] buffer = new byte[(int) dir.length()];
                reader.read(buffer, 0, buffer.length);
                setData(ZkPath.replaceAll("/+", "/"), buffer);
                reader.close();
            } catch (Exception e) {
//                log.error("write data to ZK error", e);
            }
        } else {
            File[] indexes = dir.listFiles();
            for (File index : indexes) {
                String nameString = index.getName();
                createIfNotExist((ZkPath + "/" + nameString).replaceAll("/+",
                        "/"));
                if (index.isDirectory()) {
                    uploadDataFromDir(localPath + "/" + nameString, (ZkPath
                            + "/" + nameString).replaceAll("/+", "/"));
                } else {
                    try {
                        String newPath = new String(localPath + "/"
                                + nameString);
                        BufferedInputStream reader = new BufferedInputStream(
                                new FileInputStream(newPath));
                        byte[] buffer = new byte[(int) index.length()];
                        reader.read(buffer, 0, buffer.length);
                        setData((ZkPath + "/" + nameString).replaceAll("/+",
                                "/"), buffer);
                        reader.close();
                    } catch (Exception e) {
//                        log.error("write data to ZK error?", e);
                    }
                }
            }
        }
    }

    public String upsertNode(String path, byte[] data) throws KeeperException,
            InterruptedException {
        if (zooKeeper.exists(path, false) != null) {
            setData(path, data);
            return "updated";
        } else {
            create(path, data);
            return "inserted";
        }
    }

    public String getConnAddress() {
        return StringUtils.join(connectAddress, ",");
    }

}
